iT邦幫忙

2024 iThome 鐵人賽

DAY 16
0
Software Development

從零開始構建能理解語義的 Linebot 架構系列 第 16

Kafka 概念介紹及部署: 在AWS EC2使用Docker部署Kafka及設定遠端連線

  • 分享至 

  • xImage
  •  

概述

  • 可以使用官方提供的Docker image,使用docker-compose執行kafka。
  • 必須設定好Listner及Advertised Listeners,Client端在初始連線時,拿到的是Advertised Listeners提供的位址。
  • 相關連線設定可以見本文的docker-compose範例: 在AWS EC2 上設定Advertised Listeners (含外網 / 內網Listener)

使用Docker部署Kafka及設定遠端連線

在之前的系列文章中,們已經介紹了 Kafka 的特性、角色與運作方式。講完了理論,這篇文章我們回到實作面,以我們的專案情境為例,說明如何使用 Docker 執行 Kafka。

透過Docker部署Kafka非常方便,Kafka的Docker Hub提供了現成的映像檔(Image),只要設定基本環境並啟動容器(Conatiner)即可。

然而,啟動Kafka很簡單,但要讓別人順利的連線到Kafka,這中間的眉眉角角可能比你我想像中的還多。我在設定過程中著實卡了一陣子,希望這篇文章可以幫大家少繞一點路。

快速解法

如果你只是想讓遠端的Client 連上EC2上的 Kafka broker,只需要在docker-compose中的environment加上這一行:

kafka:
    ...
    environment:
        - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://你的 EC2 IP 或外部主機名稱:9092

並且確保你的docker和EC2 security group開放 9092 port,一切搞定。

連線設定的問題

  • 這個問題在Stack Overflow上有很多類似的討論,可能因為大家實在問了太多次了,Confluent官方還發表了一篇文章,專門來解釋這其中的原理。

  • 在這篇文章中,我會整理如何讓遠端的 Producer / Consumer 連線到 EC2 上的 Kafka Broker,並解釋必須進行的相關設定。
    注: 如果使用的是 Confluent Cloud,則不需要設定 listener,這篇文章只針對使用docker-compose執行
    Kafka container的情況。

遠端連線流程與Listener

  • 當Client嘗試連線到 Kafka broker時,Client實際上是先向Kafka取得 meta data(包括 Leader Partition 的 broker 地址),再進行資料傳輸。

  • Listener 的設定分為 ListenersAdvertised Listeners 兩部分

    • Listeners:Kafka 綁定的IP和端口,預設情況下綁定 0.0.0.0,即監聽所有網絡接口。
    • Advertised Listeners:Kafka回應給Client端的連接方式,代表外部可訪問的IP或主機名稱。
  • 實作上,我們需要設定監聽0.0.0.0(監聽機器上所有interface)的Listener,以及配對的Advertised Listener。

  • 再接著說明細節之前,我們先介紹Leader partition和Follower partition,這兩個在連線中會參與的重要角色。

Leader Partition和Follower Partition

  • 當Client連線到Kafka時,實際上是透過 Leader Partition 來進行資料的讀取(Consumer)與寫入(Producer)。
  • 在Kafka中,當replication factor大於1時,每個Partition都會有多個複本(replica)來做備援,而這些複本會分為兩種角色:Leader Partition 和 Follower Partition。
  • 其中Leader Partition是唯一的,他負責處理所有對該 partition 的讀取與寫入操作。而這個 Leader Partition可能分布在Broker集群中的任何一台Broker上。
  • 所以當Client連接到Kafka時,首先需要取得的,就是Leader Partition所在Broker的連線資訊。
  • 注: 除了Leader partition以外的replica都是Follower partition,他們都不負責服務Consumer,只會從 Leader 複製資料(Message)。

遠端Client連接Kafka的流程

回到連線的過程,以下以我們專案的情境為例: 在Lambda Function中,使用KafkaJS實作的Producer。具體步驟如下:

https://ithelp.ithome.com.tw/upload/images/20240929/20105227mbkE34DrOH.png
圖: Client端先透過Initial connection,跟Broker拿到真正的連線方式(URL),對Kafka傳送訊息

  • 1.Initial connection: Client Request (Connect by Host IP)

    • Client問Broker Cluster(18.134.151.199:9092)可以連到Leader partition的Broker的連線方式
  • 2.Initial connection: Server Response(Response: meta data)

    • Broker回應meta data,裡面包含了目標Broker的連線方式(18.134.151.198:9093)
  • 3.Send Message

    • Client會從Broker給的連線方式,再連到目標Broker實際讀寫資料
  • 這邊要注意的是,我們在Client端指定的Kafka Host,只是一個可以拿到Meta data的位址。

  • Client可以透過步驟1~2 (Initial connection),去Broker Cluster取得Meta data。這份Meta Data裡面,才包含了實際要讀寫的Broker連線位址,也就是Leader Partition所在Broker位址。

  • 而這個Meta Data的內容,要在Kafka Broker(server端)上透過advertised.listeners 或者KAFKA_ADVERTISED_LISTENERS設定。如果沒設定的話,預設是回應broker所在的host name。

  • 所以如果要讓Client正確讀寫遠端broker的資料,需要在Kafka Broker config設定好連線方式,不然就會看到明明Client端可以成功connect到Broker Cluster,卻又出現連線到localhost:9092失敗這種奇怪的事情。

    • 因為沒設定目標broker連線方式,Server會叫Client去和預設值: localhost:9092做連線。

設定

Listeners

  • Kafka綁定的HostName / IP:port,如果完全不設定Listener的話,Kafka Default會有一個綁定0.0.0.0的Listener (監聽機器上所有interface)

Advertised Listeners

  • Advertised Listeners是客戶端連接的方式,格式也是HostName / IP: port
  • 一個綁定某Interface的Listener,可以設定他對應的Advertised Listener,當有Client連到這個Listener綁定的HostName / Ip:port時,回應的連線方式(Meta data),就是他對應的Advertised Listener的HostName / IP: port

設定Listener以及他對應的Advertised Listener

直接以一個docker-compose中的設定為例:

- KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://ec2–1x–1xx–1xx–1xx.ap-southeast-1.compute.amazonaws.com:9093
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
  • 這邊設定了兩個Listener,名字分別叫做INTERNAL以及EXTERNAL,他們都使用PLAINTEXT(表示沒有加密)做連線的protocol,並監聽所有的interface(0.0.0.0),並且分別監聽90929093port

  • 當Client對broker的9092 port做initial connection時,broker會回應我們設定的localhost:9092。這是因為INTERNAL這個Listener有設定同名的ADVERTISED_LISTENER。

  • 而當Client對9093 port做initial connection時,同樣也會回應跟EXTERNAL同名的 ADVERTISED_LISTENER的url: ec2–1x–1xx–1xx–1xx.ap-southeast-1.compute.amazonaws.com:9093

  • 最後我們在KAFKA_INTER_BROKER_LISTENER_NAME指定使用INTERNAL作為broker之間溝通的Listener。

相關的docker環境變數說明

KAFKA_LISTENERS

  • 格式: {LISTENER_NAME}://{hostname}:{port}
  • 定義: 設定Listener name, ip/hostname, 以及port。
    • 其中Listener name是為了指定Client端連到kafka server端時,使用的加密protocol。可以直接用Kafka 支援的protocal,也可以給一個自訂的名稱,然後在KAFKA_LISTENER_SECURITY_PROTOCOL_MAP指定這個名稱所使用的protocol。

KAFKA_ADVERTISED_LISTENERS

  • 格式(同Listener): {LISTENER_NAME}://{hostname}:{port}
  • 代表當Client端連到某個Listener時,在iInitial connection會回應給Client的位址,以LISTENER_NAME當作配對的依據

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP

  • 把Listener name 對應到kafka 支援的四種加密protocol: PLAINTEXT(沒加密, 僅建議測試用), SSL, SASL_PLAINTEXT, SASL_SSL

KAFKA_INTER_BROKER_LISTENER_NAME

指定要使用哪一個Listner來讓Broker彼此溝通

其他眉角

  • 如果沒有設定任何Advertised Listener,Listener預設回應的Meta data,會是機器的Host Name。
  • 相反的,若只設定Advertised Listener,而未設定任何Listeners,該Advertised Listener將對應到kafka的預設Listener (0.0.0.0)。
  • Advertised Listener不可設定為0.0.0.0。

在AWS EC2上設定Advertised Listeners

需注意下面的原理以及限制:

  • 如上所述,如果完全不設定Advertised Listener,則在Initial connection時,Listener預設會回給Client 這台(AWS EC2)的host name。
  • 而由於EC2的hostname是只有AWS內網可以resolved的Internal hostname,會造成Client在Initial connection後,拿到的Meta data是一個連不到目標kafka broker的Hostname。因為Client端通常不在AWS內網裡面。

docker-compose範例:在AWS EC2上設定Advertised Listeners: (單一Listener)

docker-compose範例:在AWS EC2上設定Advertised Listeners: (單一Listener)
version: "2"
services:
  zookeeper:
    image: zookeeper:3.4
    mem_limit: 104857600
    container_name: zookeeper
    restart: unless-stopped
    ports:
      # exposing for debug reason
      - 2181:2181
    volumes:
      - "./zookeeper/data:/data"
      - "./zookeeper/datalog:/datalog"
      - "./zookeeper/zoo.cfg:/conf/zoo.cfg"
  kafka:
    image: 'bitnami/kafka:2.1.1'
    hostname: localhost
    container_name: kafka
    ports:
      - '9092:9092'
      # expose the external port for external clients
      - '9093:9093'
    environment:
      # Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
      - KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      # set advertised listeners of default listener to the hostname which can be resolved both internally and externally
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://EC2的external hostname或IP:9092
    volumes:
      - KAFKA_VOLUMES:/bitnami/kafka
    depends_on:
      - zookeeper
volumes:
  KAFKA_VOLUMES:
  • 如果不需區分外網 / 內網Listener時,不用額外設定Listener, kafka 會有一個預設的Listener。只需要新增這個預設Listener的Advertised Listener,並且把值設定為EC2的對外host name。

docker-compose範例: 在AWS EC2 上設定Advertised Listeners (含外網 / 內網Listener)

docker-compose範例: 在AWS EC2 上設定Advertised Listeners (外網/內網Listener)
version: "2"
services:
  zookeeper:
    image: zookeeper:3.4
    mem_limit: 104857600
    container_name: zookeeper
    restart: unless-stopped
    ports:
      # exposing for debug reason
      - 2181:2181
    volumes:
      - "./zookeeper/data:/data"
      - "./zookeeper/datalog:/datalog"
      - "./zookeeper/zoo.cfg:/conf/zoo.cfg"
  kafka:
    image: 'bitnami/kafka:2.1.1'
    hostname: localhost
    container_name: kafka
    ports:
      - '9092:9092'
      # expose the external port for external clients
      - '9093:9093'
    environment:
      # Kafka defaults to the following jvm memory parameters which mean that kafka will allocate 1GB at startup and use a maximum of 1GB of memory
      - KAFKA_HEAP_OPTS=-Xmx512m -Xms256m
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      # set listeners and advertised listeners for external (9093) and internal(9092)
      - KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://localhost:9092,EXTERNAL://EC2的external hostname或IP:9093
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
    volumes:
      - KAFKA_VOLUMES:/bitnami/kafka
    depends_on:
      - zookeeper
volumes: 
  KAFKA_VOLUMES:

這邊要注意的是,除了要設定Listener對應的KAFKA_ADVERTISED_LISTENERS以外,
還必須設定KAFKA_INTER_BROKER_LISTENER_NAME,來告訴kafka,broker彼此之間的通訊要用哪一個listener。否則會出現下面的錯誤,即便你只設定了一個Listener也一樣:

ERROR Exiting Kafka due to fatal exception (kafka.Kafka$)
java.lang.IllegalArgumentException: requirement failed: inter.broker.listener.name must be a listener name defined in advertised.listeners. The valid options based on currently configured listeners are INTERNAL,EXTERNAL

測試: 使用kafka Producer Console 來對9093, 9092 port連線

使用Kafka提供的Producer程式,測試上面docker-compose的設定中,這兩個Port的連線:

  • 9093: 可正常連線
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh --broker-list kafka_broker_ip:9093 -topic TEST_TOPIC
  • 9092: 會連不到
docker exec -it kafka容器名稱 /opt/bitnami/kafka/bin/kafka-console-producer.sh - broker-list kafka broker ip:9092 - topic TEST_TOPIC

...ERROR Error when sending message to topic TEST_TOPIC with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.NetworkException: The server disconnected before a response was received.

以上就是連線到Kafka的設定方式。設定完成後,就可以讓專案中的AWS Lambda (Producer) 和 Bot Server(Consumer)使用Kafka作為Message Queue了。

本文修改自作者的blog文章: [Kafka] 設定Listener / Advertised Listener:讓Client遠端連線到AWS EC2上的Kafka)


上一篇
Kafka 概念介紹及部署: 以 Line Bot 專案為例圖解Rebalance
下一篇
Kafka 概念介紹及部署: 補充說明: Kafka Connect
系列文
從零開始構建能理解語義的 Linebot 架構30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言